/*
 * Copyright (C) 2017 Intel Corporation.  All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 */


// Copyright (c) WanSheng Intelligent Corp. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.


#undef LOG_TAG
#define LOG_TAG     "TASK"

#include "agent_core_lib.h"
#include "wasdk_internal_consts.h"
#include <linux/prctl.h>


struct _task_ctx
{
    pthread_mutex_t taskqueue_condition_mutex;
    pthread_cond_t  taskqueue_condition_cond;
    pthread_condattr_t cond_attr;
    wa_task_t g_task_list;
    wa_task_t g_current_task;

    bh_task_stats_t stats;

    pthread_t tid;
    bool running;
    bool exit;
};

typedef struct _task_ctx  task_ctx_t;


void bh_get_task_stats(wa_task_scheduler_t task_ctx,bh_task_stats_t * stats)
{
    task_ctx_t * ctx = (task_ctx_t*) task_ctx;
    memcpy(stats, &ctx->stats, sizeof(*stats));
    return;
}


wa_task_scheduler_t bh_init_task_scheduler()
{
    task_ctx_t * task_ctx = (task_ctx_t*) malloc(sizeof(task_ctx_t));
    if(task_ctx == NULL) return NULL;

    memset(task_ctx, 0, sizeof(*task_ctx));
    errno = pthread_condattr_init (&task_ctx->cond_attr);
    if (errno) {
        perror ("pthread_condattr_init");
        return NULL;
    }

    errno = pthread_condattr_setclock (&task_ctx->cond_attr, CLOCK_MONOTONIC);
    if (errno) {
        perror ("pthread_condattr_setclock");
    }

    pthread_mutex_init(&task_ctx->taskqueue_condition_mutex, NULL); //PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_init(&task_ctx->taskqueue_condition_cond, &task_ctx->cond_attr); // = PTHREAD_COND_INITIALIZER;


    return task_ctx;
}


wa_task_t bh_new_task(char* task_name, void * data, bool auto_free_data, int repeat_duration_ms, bh_task_handler task_handler)
{

    wa_task_t task = (wa_task_t) malloc(sizeof(*task));
    if(task == NULL) return NULL;

    assert(task_name != NULL);

    memset(task, 0, sizeof(*task));
    task->task_type = strdup(task_name);
    task->task_data = data;
    task->repeat_interval_msecs = repeat_duration_ms;
    task->handler = (bh_task_handler) task_handler;
    task->task_data_is_allocated = auto_free_data;

    TraceV(LOG_FLAG_TASK,"bh_new_task:type:%s,task:%p, data: %p",
            task->task_type,task, data);

    return task;
}

void bh_delete_task(wa_task_t task)
{
    if(task->close_handler)
    {
        task->close_handler(task);
    }

    if(task->task_data_is_allocated && task->task_data)
    {
        free(task->task_data);
    }

    TraceV(LOG_FLAG_TASK,"free task [%s], task:%p\n",task->task_type, task);

    if(task->task_type) free(task->task_type);
    free(task);


}

wa_task_t bh_schedule_task(wa_task_scheduler_t ctx, wa_task_t task, int msecs_from_now)
{
    task->next_execution = bh_get_tick_ms() + msecs_from_now;
    TraceV(LOG_FLAG_TASK,"schedule_task:type:%s,time:%d, task:%p\n",
            task->task_type,task->next_execution ,task);

    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    //put the task into the list in time order
    task->next = NULL;
    if(ctx->g_task_list == NULL || task->next_execution <= ctx->g_task_list->next_execution)
    {
        task->next  = ctx->g_task_list;
        ctx->g_task_list = task;
    }
    else
    {
        wa_task_t t = ctx->g_task_list;

        assert(t->next_execution < task->next_execution);

        while(1)
        {
            if(t->next == NULL)
            {
                t->next = task;
                break;
            }
            else if(task->next_execution < t->next->next_execution)
            {
                task->next = t->next;
                t->next = task;

                break;
            }
            t = t->next;
        }
    }


    ctx->stats.g_total_tasks ++;
    //signal if it is the first task in the time order
    pthread_cond_signal( &ctx->taskqueue_condition_cond );


    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    if(LOGV_SET(FLAG_LIB_DEBUG_TASK))bh_dump_tasks(ctx, log_get_handle(), __FUNCTION__);

    return NULL;
}


int bh_execute_task(wa_task_scheduler_t ctx, wa_task_t task) // ?
{
    int ret = -1;
    ctx->g_current_task = task;
    TraceV(LOG_FLAG_TASK,"execute_task:type:%s,time:%d, task:%p",
            task->task_type,task->next_execution, task);

    ret = task->handler(task);
    task->exec_count ++;

    ctx->g_current_task = NULL;

    return ret;
}


int bh_next_task_ms(wa_task_scheduler_t ctx)
{
    int ms = -1;

    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    tick_time_t  now = bh_get_tick_ms();

    if(ctx->g_task_list == NULL)
    {
        ms = -1;
    }
    else if(ctx->g_task_list->next_execution <= now)
    {
        ms = 0;
    }
    else
    {
        ms = ctx->g_task_list->next_execution - now;
    }

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    return ms;
}



// if the earliest task is ready to run, then remove it from the task list
// and return it to caller.
wa_task_t bh_check_task(wa_task_scheduler_t ctx, int * msecs_to_first_task)
{
    wa_task_t ret = NULL;
    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    tick_time_t  now = bh_get_tick_ms();

    if(ctx->g_task_list == NULL)
        *msecs_to_first_task = 120 * 1000;
    else if(ctx->g_task_list->next_execution <= now)
    {
        ret = ctx->g_task_list;
        ctx->g_task_list = ctx->g_task_list->next;
        ctx->stats.g_total_tasks --;
    }
    else
    {
        *msecs_to_first_task = ctx->g_task_list->next_execution - now;
    }

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    if(LOGV_SET(FLAG_LIB_DEBUG_TASK) && ret) bh_dump_tasks(ctx, log_get_handle(),__FUNCTION__);
    return ret;
}

void bh_wait_for_task(wa_task_scheduler_t ctx, int milli_seconds)
{
    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    struct timespec timeToWait;
    clock_gettime (CLOCK_MONOTONIC, &timeToWait);

    timespec_add_ms(&timeToWait, milli_seconds);

    pthread_cond_timedwait(&ctx->taskqueue_condition_cond, &ctx->taskqueue_condition_mutex, &timeToWait);

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

}



// find the task by the data pointer
wa_task_t bh_remove_task_by_data(wa_task_scheduler_t ctx, void * data_ptr)
{
    wa_task_t ret = NULL;
    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    wa_task_t t = ctx->g_task_list;
    wa_task_t prev = NULL;
    while(t)
    {
        if(t->task_data == data_ptr)
        {
            ctx->stats.g_total_tasks --;
            if(prev)
            {
                prev->next = t->next;
            }
            else
            {
                ctx->g_task_list = t->next;
            }
            ret = t;
            break;
        }
        prev = t;
        t = t->next;
    }

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    if(LOGV_SET(FLAG_LIB_DEBUG_TASK) && ret) bh_dump_tasks(ctx, log_get_handle(),__FUNCTION__);

    return ret;
}


// find the task by the data pointer
wa_task_t bh_remove_task_by_name(wa_task_scheduler_t ctx, char * name)
{
    wa_task_t ret = NULL;
    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    wa_task_t t = ctx->g_task_list;
    wa_task_t prev = NULL;
    while(t)
    {
        if(strcmp(t->task_type, name) == 0)
        {
            ctx->stats.g_total_tasks --;
            if(prev)
            {
                prev->next = t->next;
            }
            else
            {
                ctx->g_task_list = t->next;
            }
            ret = t;
            break;
        }
        prev = t;
        t = t->next;
    }

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    if(LOGV_SET(FLAG_LIB_DEBUG_TASK) && ret) bh_dump_tasks(ctx, log_get_handle(), __FUNCTION__);

    return ret;
}


int bh_remove_all_tasks_by_name(wa_task_scheduler_t task_ctx, char * name)
{
    int cnt = 0;
    while(bh_remove_task_by_name(task_ctx, name)) cnt ++;

    return cnt;
}


bool bh_reschedule_task(wa_task_scheduler_t task_ctx, char * task_name, int ms)
{
    wa_task_t t = bh_remove_task_by_name(task_ctx, task_name);
    if( t== NULL)
        return false;

    bh_schedule_task(task_ctx, t, ms);

    return true;

}


void bh_dump_tasks(wa_task_scheduler_t ctx, FILE * fp, const char * tag)
{
    fprintf(fp, "DUMP TASKS [%s]----\n", tag);
    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);

    wa_task_t t = ctx->g_task_list;
    while(t)
    {
        fprintf(fp, "  [%p] %s: execs:%d, interval:%d, alloc:%d, data:%p\n",
                t, t->task_type,
                t->exec_count, t->repeat_interval_msecs,
                t->task_data_is_allocated,
                t->task_data);
        t = t->next;
    }

    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    fprintf(fp, "DUMP TASKS ----end\n\n");

    return;
}


int task_check_expiry_ms(wa_task_scheduler_t task_ctx )
{
    int msecs_to_first_task = -1;
    task_ctx_t * ctx = task_ctx;
    wa_task_t current  = bh_check_task(ctx, &msecs_to_first_task);

    while(current)
    {
        bh_execute_task(ctx, current);

        if(current->repeat_interval_msecs == 0)
            bh_delete_task(current);
        else
            bh_schedule_task(ctx, current,current->repeat_interval_msecs);

        current  = bh_check_task(ctx, &msecs_to_first_task);
    }

    return msecs_to_first_task;
}


void * thread_task_polling(void * param)
{
    prctl (PR_SET_NAME, "task-polling");
    int msecs_to_first_task = 0;
    wa_task_t current;
    task_ctx_t * ctx = (task_ctx_t*) param;
    ctx->running = true;
    WARNING("enter thread_task_polling thread\n");

    while(ctx->exit == 0)
    {
        current  = bh_check_task(ctx, &msecs_to_first_task);
        if(current)
        {
            bh_execute_task(ctx, current);

            if(current->repeat_interval_msecs == 0)
                bh_delete_task(current);
            else
                bh_schedule_task(ctx, current,current->repeat_interval_msecs);
        }
        else
        {
            bh_wait_for_task(ctx, msecs_to_first_task);
        }
    }

    ctx->running = false;
    WARNING("Exit thread_task_polling thread\n");
    return NULL;
}

wa_task_scheduler_t bh_task_run_new_scheduler()
{
    wa_task_scheduler_t scheduler = bh_init_task_scheduler();
    task_ctx_t * ctx = (task_ctx_t*) scheduler;
    pthread_create(&ctx->tid, NULL, thread_task_polling, ctx);
    return scheduler;
}


void bh_task_destroy_scheduler(wa_task_scheduler_t task_ctx)
{
    task_ctx_t * ctx = task_ctx;
    //pthread_mutex_lock(&ctx->taskqueue_condition_mutex);    

    if(ctx->running != 0)
    {
        ctx->exit = true;
        pthread_cond_signal( &ctx->taskqueue_condition_cond );
       // pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);
        if(pthread_join(ctx->tid, NULL) != 0)
        {
            ERROR("bh_task_stop_scheduler still proceeding");
        }
    }
    

    pthread_mutex_lock(&ctx->taskqueue_condition_mutex);    
    wa_task_t t = ctx->g_task_list;
    while(t)
    {
        wa_task_t current = t;
        t = t->next;
        bh_delete_task(current);
    }
    pthread_mutex_unlock(&ctx->taskqueue_condition_mutex);

    pthread_mutex_destroy(&ctx->taskqueue_condition_mutex);
    pthread_cond_destroy(&ctx->taskqueue_condition_cond);
    pthread_condattr_destroy(&ctx->cond_attr);

    free(ctx);
}
